feat: add support for parquet content defined chunking options#21110
feat: add support for parquet content defined chunking options#21110kszucs wants to merge 8 commits intoapache:mainfrom
Conversation
47936bb to
9d757a2
Compare
This is done in So once that PR is merged this one should be good to go |
6465b79 to
4efab1d
Compare
|
@alamb please review |
|
Thanks @kszucs -- I think we should also have a ticket for this one so that we can explain the feature from a user perspective |
alamb
left a comment
There was a problem hiding this comment.
Thanks @kszucs -- this looks good to me other than it seems to be missing an "end to end" test -- aka writing / reading actual parquet files using these options to make sure everything is wired up correctly
Can you please add some .slt based tests ?
https://github.com/apache/datafusion/blob/main/datafusion/sqllogictest/README.md
Perhaps something similar to https://github.com/apache/datafusion/blob/main/datafusion/sqllogictest/test_files/encrypted_parquet.slt ?
| | datafusion.execution.parquet.allow_single_file_parallelism | true | (writing) Controls whether DataFusion will attempt to speed up writing parquet files by serializing them in parallel. Each column in each row group in each output file are serialized in parallel leveraging a maximum possible core count of n_files*n_row_groups*n_columns. | | ||
| | datafusion.execution.parquet.maximum_parallel_row_group_writers | 1 | (writing) By default parallel parquet writer is tuned for minimum memory usage in a streaming execution plan. You may see a performance benefit when writing large parquet files by increasing maximum_parallel_row_group_writers and maximum_buffered_record_batches_per_stream if your system has idle cores and can tolerate additional memory usage. Boosting these values is likely worthwhile when writing out already in-memory data, such as from a cached data frame. | | ||
| | datafusion.execution.parquet.maximum_buffered_record_batches_per_stream | 2 | (writing) By default parallel parquet writer is tuned for minimum memory usage in a streaming execution plan. You may see a performance benefit when writing large parquet files by increasing maximum_parallel_row_group_writers and maximum_buffered_record_batches_per_stream if your system has idle cores and can tolerate additional memory usage. Boosting these values is likely worthwhile when writing out already in-memory data, such as from a cached data frame. | | ||
| | datafusion.execution.parquet.use_content_defined_chunking | NULL | (writing) EXPERIMENTAL: Enable content-defined chunking (CDC) when writing parquet files. When `Some`, CDC is enabled with the given options; when `None` (the default), CDC is disabled. When CDC is enabled, parallel writing is automatically disabled since the chunker state must persist across row groups. | |
There was a problem hiding this comment.
why is it marked "Experimental"?
There was a problem hiding this comment.
It was originally contributed to parquet-cpp=21.0.0 as an experimental feature in order to allow us breaking the API but hasn't actually changed since then, now we are at 23.0.0.
I'm not anticipating any API breaking changes in the future, the only thing I may adjust are the default parameters. The current 256KiB-1MiB chunk sizes were optimized for the parquet-cpp defaults with 1MiB page sizes, but since then parquet-cpp has switched to similar row number based page splitting as parquet-rs does. Switching to smaller default chunk sizes may further improve the deduplication efficiency, just needs some experimentation with the page size distribution via https://github.com/huggingface/dataset-dedupe-estimator.
I can remove the experimental flag in this PR or when graduating the CDC feature in the parquet-cpp implementation.
fd056be to
9b548ea
Compare
… clean up proto handling - Change `CdcOptions::norm_level` from `i64` to `i32` to match parquet's type, replacing `config_field!(i64)` with `config_field!(i32)` and removing all now-unnecessary `as i32`/`as i64` casts - Add validation in `into_writer_properties_builder` for invalid CDC config: `min_chunk_size == 0` and `max_chunk_size <= min_chunk_size`, returning a proper `DataFusionError::Configuration` instead of panicking in the parquet layer - Add explanatory comments on why proto zero-value handling is asymmetric between chunk sizes (0 is invalid) and `norm_level` (0 is valid default) - Remove extra blank line in `ParquetOptions` config block - Remove unused `parquet` feature from `datafusion-proto-common/Cargo.toml` - Add three validation tests for the new error paths
- Add parquet_cdc.slt with 6 end-to-end tests: write parquet files with CDC enabled/disabled, read back and verify correctness across various data types, sizes, and CDC configurations. - Allow setting use_content_defined_chunking to 'true'/'false' to enable with defaults or disable, via a specific ConfigField impl for Option<CdcOptions>. - CdcOptions uses an inherent default() method instead of the Default trait to avoid the blanket Option<F> ConfigField impl conflict.
Add content_defined_chunking.rs with 2 tests that write parquet files using ArrowWriter with CDC-enabled WriterProperties, then inspect file metadata to verify CDC is wired through correctly: - cdc_data_round_trip: write/read 5000 rows, verify count and range - cdc_affects_page_boundaries: compare column uncompressed sizes between CDC and non-CDC writes to verify CDC changes the page layout Also fix clippy warning on CdcOptions::default() inherent method.
Rationale for this change
Expose the new Content-Defined Chunking feature from parquet-rs apache/arrow-rs#9450
What changes are included in this PR?
New parquet writer options for enabling CDC.
Are these changes tested?
In-progress.
Are there any user-facing changes?
New config options.
Depends on the 58.1 arrow-rs release.